Processing Pipeline
Overview
The Voice Features processing pipeline transforms uploaded audio files through a series of automated stages, from initial upload to final result storage with complete metadata preservation. The pipeline supports multiple agent execution, comprehensive error handling, and real-time notifications.
Pipeline Architecture
Core Components
- ConversationProcessingService: Main orchestrator for all processing stages
- ConversationStore: Manages conversation data and metadata
- AutomationStore: Handles automation rules and client-specific configurations
- AutomationRunResults: Stores processing outcomes and agent results
- TranscriptionServiceFactory: Provides transcription services with multiple providers
- LangflowService: Executes LangFlow agents for content analysis
- QueueServiceClient: Manages task queuing and concurrent processing
- SocketNotificationService: Real-time status updates to clients
Pipeline Stages
Stage 1: Upload Confirmation
Status: "Uploading"
Operations
- File Validation: Format, size, and security checks
- Metadata Extraction: Duration, MIME type, and audio properties
- Checksum Calculation: SHA-256 hash for integrity verification
- Storage Allocation: File placement in appropriate storage backend
- Database Record Creation: Initial conversation entry with ConversationSchema
Data Flow
// Input from confirm-upload endpoint
{
"filename": "meeting_recording.mp3",
"mediakey": "audio/2024/01/meeting_abc123.mp3",
"file_checksum": "a1b2c3d4e5f6...",
"duration": "00:30:47",
"size": 15728640,
"contentType": "audio/mpeg",
"folderId": 123
}
// Created ConversationSchema
{
"id": "conv_uuid",
"timestamp": 1705934400,
"duration": "00:30:47",
"transcript": "", // Initially empty
"custom_info": null,
"name": "meeting_recording.mp3",
"user_id": "user_123",
"runresultsreference": null, // Set after processing
"media_key": "audio/2024/01/meeting_abc123.mp3",
"file_checksum": "a1b2c3d4e5f6...",
"folder_id": 123,
"blob_source": "azure"
}
Stage 2: Task Queue Management
Status: "Pending" → "Processing"
Queue Operations
- Task Creation: ConversationProcessingTask with all metadata
- Concurrency Control: Maximum concurrent tasks enforcement (
maxConcurrentTasks
) - Priority Processing: FIFO queue with client isolation
- Status Tracking: Real-time task status updates
Task Structure
interface ConversationProcessingTask {
id: string; // Processing ID (UUID)
status: TaskStatus; // PENDING, PROCESSING, COMPLETED, FAILED
type: "CONVERSATION_PROCESSING";
timestamp: number;
clientId: string;
payload: {
mediaKey: string;
folderId: number | null;
source: StorageType; // "azure", "s3", "local"
conversationId: string;
type: string; // "audio"
origin: string; // Upload source
metadata: AudioMetadata;
provider: string; // Transcription provider
serviceModel?: string;
}
}
Notification Events
// Queue status notifications
ConversationNotificationType.PENDING // Task queued
ConversationNotificationType.PREPARE // Task preparation
ConversationNotificationType.START // Processing started
Stage 3: Media Retrieval
Status: "Processing" - Download Phase
Smart Download Strategy
The service implements intelligent media retrieval:
// Option 1: Direct URL (preferred for supported services)
if (supportsUrl && storageSupportsDirectUrl) {
directUrl = await storageService.generateDownloadUrl({
containerName: this.containerName,
mediaKey,
expiresInMs: 15 * 60_000, // 15 minutes
});
}
// Option 2: Blob download (fallback)
if (!directUrl) {
blobData = await this.retrieveMediaFile(mediaKey, source, clientId, taskId);
}
Storage Service Integration
- Azure Blob Storage: Primary storage with SAS token generation
- S3 Compatible: Alternative cloud storage
- Local Storage: Development and testing environments
- Multi-backend Support: Automatic service selection based on blob_source
Error Handling
- File Not Found: Clear error messages with retry suggestions
- Network Issues: Exponential backoff retry logic
- Permission Errors: Authentication failure notifications
- Timeout Handling: Configurable download timeouts
Stage 4: Transcription Processing
Status: "Transcribing"
Provider-Agnostic Transcription
// Service selection based on provider
const transcriptionService = this.transcriptionServiceFactory.getService(provider);
// Dual transcription modes
if (directUrl && service.supportsUrlTranscription()) {
operationLocation = await service.transcribeFromUrl(
directUrl,
metadata.mimeType,
metadata.language
);
} else {
operationLocation = await service.transcribe(
blobData.content,
metadata.mimeType,
metadata.language
);
}
Transcription Workflow
- Service Selection: Choose appropriate transcription provider
- Format Detection: Automatic audio format analysis
- Language Detection: Auto-detect or use specified language
- Asynchronous Processing: Non-blocking transcription with polling
- Quality Assessment: Confidence scoring and validation
AudioMetadata Processing
interface AudioMetadata {
filename: string; // Original filename
originalname: string; // User-provided name
mimeType: string; // "audio/mpeg", "audio/wav", etc.
language: string; // "en-US", "es-ES", etc.
}
Notification Flow
ConversationNotificationType.TRANSCRIBE_START // Transcription initiated
ConversationNotificationType.TRANSCRIBE_IN_PROGRESS // Processing ongoing
ConversationNotificationType.TRANSCRIBE // Transcription completed
ConversationNotificationType.TRANSCRIPTION_ERROR // Transcription failed
Timeout and Error Handling
try {
transcription = await service.getTranscriptionResult(operationLocation);
} catch (error) {
if (error.message.includes('Timeout waiting for transcription result')) {
// Handle timeout gracefully - continue with empty transcription
transcription = '';
// Notify client about timeout
} else {
throw error; // Re-throw other errors
}
}
Stage 5: Automation Rule Evaluation
Status: "Processing" - Automation Phase
Rule Matching Process
// Client-specific automation lookup
const automations = await this.automationService.getAutomationsByTypeAndOrigin(
type, // From processing config
origin // From processing config
);
// Additional client filtering happens in service layer
const clientAutomations = automations.filter(automation =>
automation.client_id === clientId && automation.is_active
);
Automation Configuration
interface Automation {
id: number;
name: string; // "Meeting Analysis Rule"
type: string; // "audio"
origin: string; // "meeting_uploads"
agents: string[]; // ["sentiment_agent", "summary_agent"]
last_update: Date;
is_active: boolean;
client_id: string; // Client isolation
}
Processing Scenarios
- No Automations Found: Continue with transcription only
- Single Automation: Execute all assigned agents
- Multiple Automations: Process all matching rules
- Empty Transcription: Skip automation processing
Stage 6: Agent Execution
Status: "Running Flows"
Multi-Agent Processing Architecture
async processAutomations(
automations: Automation[],
transcription: string,
conversationId: string,
clientId: string,
mediaKey: string,
taskId: string
): Promise<{
processedResults: ProcessedFlowResult[];
failedResults: ProcessedFlowResult[];
finalOutput: string;
}>
Agent Input Standardization
// Standard input format for all agents
const agentInput = {
transcription, // Complete transcribed text
conversationId, // For context and tracking
// Additional metadata available to agents
};
Parallel Agent Execution
for (const automation of automations) {
for (const flowId of automation.agents) {
// Each agent executes independently
const response = await this.langflowService.runAudioFlow(flowId, agentInput);
// Extract text from complex LangFlow response structure
let rawText = this.extractTextFromResponse(response);
if (rawText) {
processedResults.push({
flowId,
text: rawText,
timestamp: Date.now()
});
}
}
}
LangFlow Response Parsing
The service handles complex LangFlow response structures:
// Multiple possible response paths
if (response.data?.[0]?.results?.message?.text) {
rawText = response.data[0].results.message.text;
} else if (response.data?.[0]?.outputs?.message?.message?.text) {
rawText = response.data[0].outputs.message.message.text;
} else if (response.data?.outputs?.[0]?.outputs?.[0]?.results?.message?.data?.text) {
rawText = response.data.outputs[0].outputs[0].results.message.data.text;
}
Agent Result Collection
interface ProcessedFlowResult {
flowId: string; // Agent identifier
text?: string; // Successful output
timestamp: number; // Completion time
error?: string; // Error message if failed
}
interface TaskResults {
flows: ProcessedFlowResult[]; // Successful executions
failed_flows: ProcessedFlowResult[]; // Failed executions
finalOutput: string; // Combined result or error summary
}
Error Isolation and Recovery
- Individual Agent Failures: Don't affect other agents
- Partial Success Handling: Continue with successful results
- Error Aggregation: Collect all errors for debugging
- Timeout Management: Per-agent timeout limits
Stage 7: Result Storage and Completion
Status: "Completed"
AutomationRunResult Creation
const runResult: AutomationRunResult = {
id: uuidv4(), // Unique result identifier
file_reference: mediaKey, // Link to original media
task_results: {
flows: processedResults, // Successful agent outputs
failed_flows: failedResults, // Failed agent attempts
finalOutput: combinedOutput // Processed or error summary
},
created_at: new Date()
};
// Store results for future access
await this.automationRunResults.insert(runResult);
Conversation Update
// Update conversation with all processing results
const updateData = {
transcript: transcription, // Always update transcript
runresultsreference: runResult.id, // Link to processing results
custom_info: this.parseProcessingOutput(finalOutput) // Structured data
};
await this.conversationService.updateConversation(conversationId, updateData);
Custom Info Processing
// Parse agent output into structured data
if (finalOutput.includes('custom-info')) {
updateData.custom_info = parseJsonDynamic(finalOutput) || { "": "" };
} else {
updateData.custom_info = { "": "" }; // Default empty object
}
Notification System
Real-Time Status Updates
The pipeline provides comprehensive notifications throughout processing:
enum ConversationNotificationType {
PENDING = "pending",
PREPARE = "prepare",
START = "start",
DOWNLOAD = "download",
TRANSCRIBE_START = "transcribe_start",
TRANSCRIBE_IN_PROGRESS = "transcribe_in_progress",
TRANSCRIBE = "transcribe",
FETCH_AUTOMATIONS = "fetching-automations-up",
AUTOMATION_PROCESSING = "automation-processing",
RUNNING_FLOWS = "running - flows",
SAVE_DB = "save-db",
COMPLETED = "completed",
// Error notifications
FILE_UPLOAD_ERROR = "file-upload-error",
TRANSCRIPTION_ERROR = "transcription-error",
AUTOMATION_FETCH_ERROR = "automation-fetch-error",
AUTOMATION_ERROR = "automation-error"
}
Notification Data Structure
interface ConversationProcessingNotificationData {
type: ConversationNotificationType;
status: EventStatus; // SUCCESS, FAIL, WARNING
resourceId: string; // mediaKey
conversationId?: string;
timestamp: Date;
taskId?: string;
extra?: {
reason?: string;
processing_completed?: boolean;
is_processing?: boolean;
error?: string;
context?: string;
};
}
Error Handling and Recovery
Comprehensive Error Management
type ConversationProcessingErrorType =
| "fetch" // Database retrieval errors
| "transcribe" // Transcription service errors
| "automation" // Automation rule errors
| "service-restart" // System restart during processing
| "capacity" // Resource limitation errors
| "download" // File retrieval errors
| "upload" // File upload errors
| "save_db" // Database save errors
| "running_flows" // Agent execution errors
| "automation_processing" // Automation processing errors
| "aborted" // User-initiated cancellation
| "unhandled_error" // Unexpected errors
| "other"; // Miscellaneous errors
Error Recovery Strategies
Graceful Degradation
// Empty transcription handling
if (!transcription.trim()) {
await this.conversationService.updateConversation(conversationId, {
transcript: '',
custom_info: {
status: 'EMPTY_TRANSCRIPTION',
user_message: "We couldn't detect any speech in this recording."
}
});
// Complete task successfully with warning
}
Partial Failure Handling
// Automation error with successful transcription
try {
await this.processAndSaveAutomations(/* ... */);
} catch (automationError) {
await this.conversationService.updateConversation(conversationId, {
transcript: transcription, // Keep successful transcription
custom_info: {
error: 'Automation processing failed',
status: 'AUTOMATION_ERROR',
user_message: 'Transcript is available, but analysis failed.'
}
});
// Complete with warning status
}
Task Lifecycle Management
Boot Recovery
// Handle tasks interrupted by service restart
private async updateStatusOnBoot() {
const interruptedTasks = await this.conversationProcessingStore
.getProcessingByStatus(TaskStatus.PROCESSING);
// Mark as failed with service-restart error
const promises = interruptedTasks.map(task =>
this.conversationProcessingStore.updateConversationProcessingStatus(
task.processing_id, {
status: TaskStatus.FAILED,
error: {
message: 'Service restarted during processing',
type: 'service-restart'
}
}
)
);
}
Queue Cleanup
// Clean abandoned tasks on startup
private async cleanupQueue() {
const tasksCount = await this.taskManager.cleanupQueueByType("CONVERSATION_PROCESSING");
logger.debug(`Cleaned up ${tasksCount} tasks from queue`);
}
Performance and Scalability
Concurrency Control
// Configurable concurrent processing
private maxConcurrentTasks: number = 1;
private activeTasksCount: number = 0;
// Dynamic concurrency adjustment
setMaxConcurrentTasks(value: number) {
this.maxConcurrentTasks = Math.max(1, value);
}
Resource Management
- Memory Optimization: Stream processing for large files
- Storage Efficiency: Temporary URL generation for direct access
- Network Optimization: Choose best download method per file
- Database Optimization: Batch operations and proper indexing
Monitoring and Observability
// Performance timing
console.time(`conversation-processing-${mediaKey}`);
// ... processing ...
console.timeEnd(`conversation-processing-${mediaKey}`);
// Detailed logging at each stage
logger.debug(`Starting task: ${task.id} for media: ${mediaKey} (${activeTasksCount}/${maxConcurrentTasks})`);
Integration Points
Storage Backends
- Multi-provider Support: Azure, S3, local storage
- Automatic Selection: Based on blob_source configuration
- Fallback Mechanisms: Graceful degradation between storage types
Transcription Services
- Provider Abstraction: Pluggable transcription services
- Capability Detection: URL vs blob transcription support
- Quality Optimization: Best method selection per service
LangFlow Integration
- Agent Orchestration: Multiple agent execution per automation
- Response Parsing: Robust output extraction from complex responses
- Error Isolation: Individual agent failure handling
The Voice Features processing pipeline provides enterprise-grade reliability with comprehensive error handling, real-time monitoring, and graceful degradation for all failure scenarios.